package com.kool.kmqtt.server.processer;

import com.kool.kmqtt.server.PacketSender;
import com.kool.kmqtt.server.constant.PacketTypeEnum;
import com.kool.kmqtt.server.encoder.PubcompPacketEncoder;
import com.kool.kmqtt.server.exception.ErrorCode;
import com.kool.kmqtt.server.exception.ProtocolException;
import com.kool.kmqtt.server.packet.*;
import com.kool.kmqtt.server.parser.PacketParser;
import io.netty.channel.ChannelHandlerContext;

/**
 * PUBREL报文处理器
 */
public class PubrelPacketProcessor extends PacketProcessor {
    public PubrelPacketProcessor(ChannelHandlerContext ctx, PacketParser packetParser) {
        super(ctx, packetParser);
    }

    @Override
    protected void validate(Packet packet) {
        //PUBREL控制报文固定报头的第3,2,1,0位是保留位，必须被设置为0,0,1,0
        if (packet.getFixedHeader().getFlags() != 2) {
            throw new ProtocolException(ErrorCode.PUBREL_FLAGS_ERROR);
        }
    }

    @Override
    protected void processPacket(Packet packet) {
        //取出报文id
        int packetId = packet.getPacketId();

        //查询入站未确认的PUBLISH报文
        Packet publishPacket = repository.getReceivePacket(packetId);

        //分发PUBLISH报文
        PublishUtil.sendPublishToSubscribers(publishPacket);

        //删除入站未确认的PUBLISH报文
        repository.deleteReceivePacket(packetId);

        //响应PUBCOMP报文
        sendPubcomp(packetId);
    }

    /**
     * 响应PUBCOMP报文
     *
     * @param packetId
     */
    private void sendPubcomp(int packetId) {
        FixedHeader fixedHeader = new FixedHeader();
        fixedHeader.setPacketType(PacketTypeEnum.PUBCOMP.getCode());
        fixedHeader.setFlags(0);
        fixedHeader.setRemainingLength(2);

        PubcompVariableHeader variableHeader = new PubcompVariableHeader();
        variableHeader.setPacketId(packetId);

        Packet packet = new Packet();
        packet.setFixedHeader(fixedHeader);
        packet.setPacketId(packetId);
        packet.setVariableHeader(variableHeader);

        PacketSender packetSender = new PacketSender(sessionContext, new PubcompPacketEncoder());
        packetSender.send(packet);
    }
}
